Apprendre à utiliser des fichiers au format Parquet avec R
16/01/2025
Ce support ne couvre pas tous les aspects des traitements qu’il est possible de réaliser avec les fichiers au format Parquet mais il constitue une base sur laquelle s’appuyer si vous rencontrez des fichiers Parquet sur Cerise ou ailleurs.
Un nouveau format de données …
Des fichiers moins volumineux qu’en csv 500 Mo en Parquet vs 5 Go en csv
Des requêtes plus rapides et efficaces
Seulement les données nécessaires sont lues, pas tout le fichier
Des données conformes à la mise à disposition par le producteur (par exemple, plus de problème de codes communes…)
=> Un format très efficace pour l’analyse de données mais peu adapté à l’ajout de données en continu ou à la modification fréquente de données existantes.
L’Insee diffuse des données du recensement de la population au format Parquet
Voir le guide d’utilisation joint pour manipuler ces données
Premières diffusions sur data.gouv avec les bureaux de vote, les demandes de valeurs foncières, indicateurs pénaux…)
Prévisualisations des fichiers Parquet possibles avec le nouvel explorateur de données du SSP Cloud ou avec avec l’outil ParquetViewer.
library(arrow) # Le package arrow est nécessaire pour travailler avec des fichiers parquet
library(dplyr) # Pour utiliser dplyr
library(tictoc) # Pour le benchmarkPour l’exemple, nous allons prendre une table des eploitations du RA 2020 d’une centaine de MO qui contient 416 478 lignes et 255 colonnes.
Le résultat obtenu est un objet directement utilisable dans R (ici un data.frame).
Il est possible de sélectionner les colonnes que l’on souhaite importer dans R directement dans la fonction read_parquet :
Voyons l’écart avec la lecture d’un fichier rds :
=> Le temps nécessaire au chargement de la table est d’environ 6 secondes !
L’écart est significatif rien que sur la lecture (X 6).
RA2020 est un data.frame : on peut donc utiliser la syntaxe dplyr :
resultat <- RA2020 |>
filter(SIEGE_REG == "93") |>
group_by(SIEGE_DEP) |>
summarise(total_SAU = sum(SAU_TOT, na.rm = TRUE))
# A tibble: 6 × 2
SIEGE_DEP total_SAU
<chr> <dbl>
1 04 158946.
2 05 91979.
3 06 41141.
4 13 145713.
5 83 77785.
6 84 112888.Voici ci-dessous la syntaxe recommandée pour requêter un fichier parquet volumineux :
# Établir la connexion aux données
RA2020 <- open_dataset("data/RA2020_exploitations.parquet") |>
filter(SIEGE_REG == "93") |>
group_by(SIEGE_DEP) |>
summarise(total_SAU = sum(SAU_TOT, na.rm = TRUE)) |>
collect()=> Avec cette syntaxe, la requête va automatiquement utiliser les variables du fichier Parquet dont elle a besoin (en l’occurence SIEGE_REG, SIEGE_DEP et SAU_TOT) et minimiser l’occupation de la mémoire vive.
Revenons dans le détail sur cette syntaxe…
open_dataset() (1/4)
Comme la fonction read_parquet(), la fonction open_dataset() permet de lire des données stockées en format Parquet.
Le résultat obtenu avec la fonction open_dataset() n’est plus un data.frame mais un Arrow Table qui est une structure de données spécifique.
open_dataset() (2/4)La fonction open_dataset() crée un objet qui apparaît dans Values.
L’affichage dans la console d’un Arrow Table affiche uniquement les métadonnées.
open_dataset() (3/4)Pour afficher le contenu d’un Arrow Table, il faut d’abord le convertir en data.frame avec la fonction collect().
RA2020 <- RA2020 |> collect()
class(RA2020)
> [1] "data.frame"
# L'opération ci-dessus est à éviter pour des tables volumineuses, si besoin de visualiser la table, on préfèrera :
extrait_RA2020 <- RA2020 |> slice_head(n = 100) |> collect()Toutefois rien ne presse car la grande différence entre manipuler un data.frame et un Arrow Table tient au moteur d’exécution :
Si on manipule un data.frame avec la syntaxe de dplyr, alors c’est le moteur d’exécution de dplyr qui fait les calculs
Si on manipule un Arrow Table avec la syntaxe de dplyr, alors c’est le moteur d’exécution d’arrow (nommé acero) qui fait les calculs. Et le moteur d’exécution d’arrow est beaucoup plus efficace et rapide
open_dataset() (4/4)
Il est recommandé de privilégier la fonction open_dataset() à la fonction read_parquet() pour au moins 2 raisons :
open_dataset() crée une connexion au fichier Parquet mais elle n’importe pas les données contenues dans ce fichier => une consommation de RAM moins importante !
open_dataset() peut se connecter à un fichier Parquet unique mais aussi à des fichiers Parquets partitionnés (voir plus loin)
Cela signifie qu’arrow se contente de mémoriser les instructions, sans faire aucun calcul tant que l’utilisateur ne le demande pas explicitement.
Il existe 2 fonctions pour déclencher l’évaluation d’un traitement arrow mais qui présente des différences :
collect() qui renvoie le résultat du traitement sous la forme d’un data.frame/tibblecompute() qui renvoie le résultat du traitement sous la forme d’un Arrow Table.La grande différence entre manipuler un tibble et manipuler un Arrow Table tient au moteur d’exécution :
Dans les traitements intermédiaires, on privilégiera la fonction compute() pour pouvoir utiliser le plus possible le moteur acero.
SAU_DEP <- RA2020 |>
group_by(SIEGE_DEP) |>
summarise(total_SAU = sum(SAU_TOT, na.rm = TRUE))
class(SAU_DEP)
> [1] "arrow_dplyr_query"
resultats <- SAU_DEP |>
filter(SIEGE_DEP == "13") |>
collect()
> # A tibble: 1 × 2
SIEGE_DEP total_SAU
<chr> <dbl>
1 13 145713.Dans l’exemple ci-dessus, la première étape ne réalise aucun calcul par elle-même, car elle ne comprend ni collect() ni compute(). L’objet SAU_DEP n’est pas une table et ne contient pas de données, il contient simplement une requête (query) décrivant les opérations à mener sur la table du RA.
arrow analyse la requête avant de l’exécuter, et optimise le traitement pour minimiser le travail.
Dans notre exemple, arrow repère que la requête ne porte en fait que sur le département 13, et commence donc par filtrer les données sur le département avant de sommer la SAU les équipements, de façon à ne conserver que le minimum de données nécessaires et à ne réaliser que le minimum de calculs.
L’évaluation/exécution différée est très puissante mais présente des limites.
On serait tentés d’écrire un traitement entier en mode lazy (sans aucun compute() ni collect() dans les étapes intermédiaires) et de faire un unique compute() ou collect() tout à la fin du traitement afin que toutes les opérations soient optimisées en une seule étape.
Malheureusement, le moteur acero a ses limites notamment sur des traitements trop complexes (ce qui génère des plantages de sessions R).
QUELQUES CONSEILS POUR ÉLABORER LA BONNE STRATÉGIE AVEC L’ÉVALUATION DIFFÉRÉE :
compute())La liste des fonctions du tidyverse supportées par acero est disponible sur cette page.
Il y a (encore) quelques grands absents, notamment :
pivot_wider() et pivot_longer() n’ont pas d’équivalent avec acero.
les empilements de plusieurs tables avec une seule fonction (bind_rows() dans dplyr).
Avec des Arrow Tables, il faut appeler plusieurs fois ces fonctions (en l’occurence union(). Par exemple :
res <- RA2020 |>
group_by(SIEGE_REG) |>
mutate(total_SAU = sum(SAU_TOT)) |>
collect()
> Error: window functions not currently supported in Arrow
Call collect() first to pull data into R.Remarque : le code ci-dessus fonctionne par contre en remplaçant le mutate() par un summarise().
Plusieurs solutions existent :
collect() et poursuivre le traitement avec le moteur d’exécution de dplyr (avec des performances moins importantes).Exemple pour le point 2 issu d’utilitr :
resultats <- bpe_ens_2018_arrow |>
group_by(DEP) |>
summarise(
nb_boulangeries = sum(NB_EQUIP * (TYPEQU == "B203")),
nb_poissonneries = sum(NB_EQUIP * (TYPEQU == "B206"))
) |>
compute()
> ! NotImplemented: Function 'multiply_checked' has no kernel matching input types (double, bool); pulling data into RL’erreur vient de l’opération sum(NB_EQUIP * (TYPEQU == “B203”)) : arrow ne parvient pas à faire la multiplication entre NB_EQUIP (un nombre réel) et (TYPEQU == “B203”) (un booléen).
=> La solution est très simple: il suffit de convertir (TYPEQU == “B203”) en nombre entier avec la fonction as.integer() qui est supportée par acero.
Le code suivant peut alors être entièrement exécuté par acero:
Le package arrow présente 3 avantages majeurs :
Performances élevées : arrow est très efficace et très rapide pour la manipulation de données tabulaires (nettement plus performant que dplyr par exemple)
Usage réduit des ressources : arrow est conçu pour ne charger en mémoire que le minimum de données. Cela permet de réduire considérablement les besoins en mémoire, même lorsque les données sont volumineuses
Facilité d’apprentissage grâce aux approches dplyr et SQL: arrow peut être utilisé avec les verbes de dplyr (select, mutate, etc.) et/ou avec le langage SQL grâce à DuckDB (voir plus loin).
Exercice 1 (premiers contacts avec un fichier parquet + rappels sur les fonctions)
Ouvrir le fichier parquet situé sous ~/CERISE/03-Espace-de-Diffusion/030_Structures_exploitations/3020_Recensements/RA_2020/01_BASES DIFFUSION RA2020/RA_2020_parquet/RA2020_EXPLOITATIONS_240112.parquet
Consulter les métadonnées de ce fichier
Consulter les 100 premières lignes de ce fichier
Récupérer dans un vecteur trié les codes régions des lieux principaux de production (SIEGE_REG)
Récupérer dans un vecteur trié les libellés régions des lieux principaux de production (SIEGE_LIB_REG)
Ecrire une fonction calculs_RA() qui - pour une région et une table donnée en entrée - conserve uniquement les lignes correspondantes selon la colonne SIEGE_REG, puis groupe la table par SIEGE_DEP et calcule la surface totale SAU (SAU_TOT), la surface totale de céréales (CEREALES_SUR_TOT) et la surface totale d’oléagineux (OLEAG_SUR_TOT) et enfin la part de la surface des cereales dans la SAU totale et la part de la surface des oléagineux dans la SAU totale.
Utilser ensuite la fonction calculs_RA() pour calculer ces indicateurs sur l’ensemble des régions présentes dans la table du RA2020 et stocker les résultats dans des fichiers Excel sous votre espace personnel.
TIPS : pensez à utiliser {purrr} et {openxlsx} par exemple.
Exercice 2 (collect() vs compute())
data_a <- tibble(
id = rep(1:1000000, each = 10),
annee = rep(2016:2025, times = 1000000),
a = sample(letters, 10000000, replace = TRUE)
)
data_b <- tibble(
id = rep(1:1000000, each = 10),
annee = rep(2016:2025, times = 1000000),
b = runif(10000000, 1, 100)
)
data_c <- tibble(
lettres = sample(letters, 10000000, replace = TRUE),
classe = sample(c("pommes","poires","melon","fraise"), 10000000, replace = TRUE)
)
write_parquet(data_a, "data_a.parquet")
write_parquet(data_b, "data_b.parquet")
write_parquet(data_c, "data_c.parquet")
rm(data_a)
rm(data_b)
rm(data_c)
gc()Exercice 2 (collect() vs compute())
collect()Charger les fichiers parquet data_a et data_b sous forme de data.frame
Créer la table etape1 en réalisant une jointure à gauche de data_a avec data_b.
Charger le fichier parquet data_c sous forme de data.frame
Filtrer la table etape1 sur les années supérieures à 2020 puis faire la somme de la colonne b selon la colonne a
Ajouter le colonne classe issue de la table data_c dans le tableau final.
compute()Réaliser les mêmes traitements que A) avec des compute() et réduire le temps d’exécution.
En tant que responsable de sources, vous pouvez être amenés à écrire et déposer des fichiers Parquet, par exemple sous Cerise.
Pour cela, on utilise la fonction write_parquet().
Un 1er exemple simple à partir d’un fichier rds:
# Lecture du fichier rds
msa_ns <- readRDS("data/msa_ns_src_2023.rds")
# Écriture des données en format Parquet
write_parquet(x = msa_ns, sink = "data/msa_ns_src_2023.parquet")Un autre exemple un peu plus compliqué à partir de fichier csv contenu dans un zip sur internet :
# Chargement des packages
library(arrow)
library(readr)
# Téléchargement du fichier zip
download.file("https://www.insee.fr/fr/statistiques/fichier/2540004/dpt2021_csv.zip", destfile = "data/dpt2021_csv.zip")
# Décompression du fichier zip
unzip("data/dpt2021_csv.zip", exdir = "data")
# Lecture du fichier CSV
dpt2021 <- read_delim(file = "data/dpt2021.csv")
# Écriture des données en format Parquet
write_parquet(x = dpt2021, sink = "data/dpt2021.parquet"))
Pourquoi partitionner ?
Par définition, il n’est pas possible de charger seulement quelques lignes d’un fichier Parquet : on importe nécessairement des colonnes entières.
Lorsque le fichier Parquet est partitionné, arrow est capable de filtrer les lignes à importer à l’aide de clés departitionnement, ce qui permet d’accélérer l’importation des données.
Le partitionnement permet de travailler sur des fichiers Parquet de plus petite taille et donc de consommer moins de mémoire vive.
Ça veut dire quoi partitionné ?
Partitionner un fichier revient à le “découper” selon une clé de partionnement (une ou plusieurs variables)
En pratique, l’ensemble des données sera stockée dans plusieurs fichiers au format Parquet.
Voici par exemple comment se présente un fichier Parquet partitionné selon les régions :
Pour écrire des fichiers Parquet partitionnés, on utilise la fonction write_dataset().
Partitionnons notre fichier issu de la MSA par type d’exploitation et sexe :
write_dataset(
dataset = msa_ns,
path = "data/msa_ns",
partitioning = c("TYPE_EXP","SEXE"), # les variables de partitionnement
format = "parquet"
)Voici un aperçu de l’arborescence créée (:
Le package R parquetize permet de faciliter la conversion de données au format Parquet.
Plusieurs formats supportés csv, json, rds, fst, SAS, SPSS, Stata, sqlite…
Propose des solutions de contournement pour les fichiers très volumineux.
Un exemple issu de la documentation :
Conversion from a local rds file to a partitioned parquet file :: 12
rds_to_parquet(
path_to_file = system.file("extdata","iris.rds",package = "parquetize"),
path_to_parquet = tempfile(fileext = ".parquet"),
partition = "yes",
partitioning = c("Species")
)
#> Reading data...
#> Writing data...
#> ✔ Data are available in parquet dataset under /tmp/RtmptNiaDm/file1897441ca0c0.parquet
#> Writing data...
#> Reading data...La fonction open_dataset() permet d’ouvrir une connexion vers un fichier Parquet partitionné.
L’utilisation de la fonction open_dataset() est similaire au cas dans lequel on travaille avec un seul fichier Parquet.
Il y a toutefois 2 différences :
Un exemple avec les données de la MSA :
# Établir la connexion au fichier Parquet partitionné
donnees_msa <- open_dataset(
"data/msa_ns", # Ici, on met le chemin d'un répertoire
hive_style = TRUE,
partitioning = arrow::schema(TYPE_EXP = arrow::utf8(), SEXE = arrow::utf8()) # Les variables de partitionnement
)
# Définir la requête
resultats_msa <- donnees_msa |>
filter(TYPE_EXP == "2" & SEXE == "1") |> # Ici, on filtre selon les clés de partitionnement
select(DEPT, RC_CHEF) |>
collect()Ce qui donne :
Afin de tirer au mieux profit du partitionnement, il est conseillé de filtrer les données de préférence selon les variables de partitionnement (dans notre exemple, TYP_EXP et SEXE).
Il est fortement recommandé de spécifier le type des variables de partitionnement avec l’argument partitioning.
Cela évite des erreurs typiques: le code du département est interprété à tort comme un nombre et aboutit à une erreur à cause de la Corse…
L’argument partitioning s’utilise en construisant un schéma qui précise le type de chacune des variables de partitionnement.
Voir cette page pour la liste des types supportés.
arrowIl est recommandé de définir les deux options suivantes au début de votre script.
Cela autorise arrow à utiliser plusieurs processeurs à la fois, ce qui accélère les traitements :
Pour ceux qui veulent aller plus loin :
arrow ? C’est par iciNote interne écrite par le DEMESIS : voir ici
2 Comment utiliser/interroger un fichier parquet ?